初识 Elastic Job

序言

  我们都听过愚公移山的故事,对个人来讲,移走一座山需要耗费的时间无疑是十分漫长的。因此,愚公找亲朋好友帮忙一起挖山,最终感动了天帝,天帝叫部下施展了法力将太行、王屋山移走。
  愚公不愚,知道将任务拆解,分而治之。
  那么,在分布式任务中,我么又何须使用传统的定时任务框架去完整的执行一个任务呢?
  现在,让我们使用起 Elastic Job 吧!

简介

  Elastic-Job-Lite 定位为轻量级无中心化解决方案,使用 jar 包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅 Zookeeper。

  Elastic-Job 是一个分布式调度解决方案,由两个相互独立的子项目 Elastic-Job-Lite 和 Elastic-Job-Cloud 组成。

基本概念

分片

  任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

  例如:有一个遍历数据库某张表的作业,现有 2 台服务器。
  为了快速的执行作业,那么每台服务器应执行作业的 50% 。
  为满足此需求,可将作业分成 2 片,每台服务器执行 1 片。
  因此,作业遍历数据的逻辑应为:

  • 服务器 A 遍历 ID 以奇数结尾的数据;
  • 服务器 B 遍历 ID 以偶数结尾的数据

  如果分成10片,则作业遍历数据的逻辑应为每片分到的分片项应为 ID % 10,因此:

  • 服务器 A 被分配到分片项 0,1,2,3,4
  • 服务器 B 被分配到分片项 5,6,7,8,9

  直接的结果就是服务器 A 遍历 ID 以 0-4 结尾的数据;服务器 B 遍历 ID 以 5-9 结尾的数据。

分片项与业务处理解耦

  Elastic-Job 并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

  为此,Elastic-Job 中使用个性化参数来处理这种关系。

个性化参数及适用场景

  个性化参数,即 shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码

  例如:按照地区水平拆分数据库,数据库 A 是北京的数据;数据库 B 是上海的数据;数据库 C 是广州的数据。
  如果仅按照分片项配置,开发者需要了解:

  • 0 表示北京
  • 1 表示上海
  • 2 表示广州

  合理使用个性化参数可以让代码更可读,如果配置为 0 = 北京,1 =上海,2 = 广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

  简而言之,分片数字和个性化参数可以形成一个映射关心。

核心理念

分布式调度

  Elastic-Job-Lite 并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。

  注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。

作业高可用

  Elastic-Job-Lite 提供最安全的方式执行作业。
  若将分片总数设置为 1,并使用多于 1 台的服务器执行作业,那么作业将会以 1 主 n 从的方式执行。

  一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。

:开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。

资源最大化

  Elastic-Job-Lite 提供灵活的配置方式,可以最大限度的提高执行作业的吞吐量。
  对于分片项的设置,应该大于服务器的数量,且最好是大于服务器倍数的数量,此时作业将会合理的利用分布式资源,动态的分配分片项。

  举个栗子:3 台服务器,分片项为 10 片,则分片项分配结果为:

  • 服务器 A = 0,1,2
  • 服务器 B = 3,4,5
  • 服务器 C = 6,7,8,9

  若服务器 C 崩溃,则分片项分配结果为:

  • 服务器 A = 0,1,2,3,4
  • 服务器 B = 5,6,7,8,9

  在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。

快速入门

  ① 首先在pom.xml添加相关依赖:

1
2
3
4
5
<!-- 版本配置 -->
<properties>
<java.version>1.8</java.version>
<elastic.job.version>2.1.5</elastic.job.version>
</properties>

1
2
3
4
5
6
7
8
9
10
11
12
<!--调度框架-->
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic.job.version}</version>
</dependency>

<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic.job.version}</version>
</dependency>

  ② 配置 ZooKeeper 的相关参数信息(如服务器地址、命名空间):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 调度框架 ZooKeeper 配置
*
* @author lovike
* @since 2020/6/26
*/
@Configuration
@ConditionalOnExpression("'${elasticjob.zookeeper.host}'.length() > 0")
public class JobZookeeperRegistryCenterConfig {

@Bean(initMethod = "init")
public ZookeeperRegistryCenter zookeeperRegistryCenter(@Value("${elasticjob.zookeeper.host}") final String serverList,
@Value("${elasticjob.zookeeper.namespace}") final String namespace) {
return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList, namespace));
}
}

  ③ 创建简单作业(比如一个监控接口超时报警的作业):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* @author lovike
* @description: 监控接口超时报警作业
* @since 2020-06-26
*/
@Slf4j
@Component
public class WatchApiTimeOutJob implements SimpleJob {
// 配置的允许超时的数量
@Value("${elasticJob.apitimeout.rule.count}")
private int count;
// 指定时间范围
@Value("${elasticJob.apitimeout.rule.scope}")
private int scope;

@Autowired
ApiTimeResultMapper apiTimeResultMapper;

@Override
public void execute(ShardingContext shardingContext) {
Integer item = shardingContext.getShardingItem();
Integer totalCount = shardingContext.getShardingTotalCount();
log.info("开始执行 elastic-job,任务名称:WatchApiTimeOut,总分片数:{},当前分片数:{}", totalCount, item);
LocalDateTime endTime = LocalDateTime.now();
// 配置时间范围:小时
LocalDateTime startTime = endTime.minusHours(scope);
Long startTimestamp = startTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
Long endTimestamp = endTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();

// 从数据库获取一段时间内响应时间超过 1s(该时间与 sql 列有关)以上的接口总数,若其到达配置的某个值, 报警
// 一段时间内:(结束是当前时间,开始时间是当前时间减去可配置的时间)

int apiTimeoutCount = apiTimeResultMapper.selectApiTimeoutCount(new Date(startTimestamp), new Date(endTimestamp));
if (apiTimeoutCount >= count) {
log.info("报警拉,允许接口超时数量:{},当前接口超时数:{}", count, apiTimeoutCount);
}
log.info("执行结束 elastic-job,任务名称:WatchApiTimeOut,总分片数:{},当前分片数: {}", totalCount, item);
}
}

  ④ 配置作业的相关参数信息并启动作业:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* @author lovike
* @description: 监控接口超时的配置
* @since 2020-06-26
*/
@Configuration
public class WatchApiTimeOutConfig {

@Resource
private ZookeeperRegistryCenter zookeeperRegistryCenter;

@Autowired
public WatchApiTimeOutJob watchApiTimeOutJob;


private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
final String cron,
final int shardingTotalCount,
final Boolean enabled) {
// 根据传入的参数构建 JobCoreConfiguration
JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(
jobClass.getName(),
cron,
shardingTotalCount).failover(true).build();

// 根据 JobCoreConfiguration 构建 SimpleJobConfiguration
SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(
jobCoreConfiguration, jobClass.getCanonicalName());

return LiteJobConfiguration.newBuilder(simpleJobConfiguration).disabled(!enabled)
.monitorExecution(true).overwrite(true).build();
}

@Bean(initMethod = "init", name = "ApiTimeOutJobScheduler")
public JobScheduler simpleJobScheduler(@Value("${elasticJob.apitimeout.cron}") final String cron,
@Value("${elasticJob.apitimeout.shardingTotalCount}") final int shardingTotalCount,
@Value("${elasticJob.apitimeout.enabled}") final Boolean enabled) {
// 使用配置文件参数构建 LiteJobConfiguration
LiteJobConfiguration liteJobConfiguration = getLiteJobConfiguration(
watchApiTimeOutJob.getClass(),
cron,
shardingTotalCount,
enabled);

return new SpringJobScheduler(watchApiTimeOutJob, zookeeperRegistryCenter, liteJobConfiguration);
}
}

  下面是application.properties的所有配置信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# elastic-job zookeeper address
elasticjob.zookeeper.host=xx.xx.xx.xx:2181
elasticjob.zookeeper.namespace=elasticjob-job-lite-apiwatch

# 定时器执行时间
elasticJob.apitimeout.cron= 0/10 * * * * ?
# 分片总数
elasticJob.apitimeout.shardingTotalCount=1
# 是否启动
elasticJob.apitimeout.enabled=true
# 允许接口数
elasticJob.apitimeout.rule.count=10
# 时间设置
elasticJob.apitimeout.rule.scope=24

参考

文章信息

时间 说明
2020-06-03 初版
2020-11-19 序言
0%